1 What is Apache Spark

2 The First Abstraction of Spark

3 The Second Abstraction of Spark

4 Two Types of RDDs

5 RDD Operations

5.1 RDD Transformations

  • All transformations are lazy, meaning that they do not compute their results right away, instead they just remember the transformations applied to some dataset. The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently.
  • By default, each transformed RDD is recomputed each time you run an action on it. However, you may also persist an RDD in memory, on disk or replicated across the cluster.

5.2 A List of RDD Transformation Functions

  • map
  • filter(func)
  • flatMap(func)
  • mapPartitions(func)
  • mapPartitionsWithIndex(func)
  • sample(withReplacement, fraction, seed)
  • union(otherDataset)
  • groupByKey([numTasks])
  • reduceByKey(func, [numTasks])
  • sortByKey([ascending], [numTasks])
  • join(otherDataset, [numTasks])
  • cogroup(otherDataset, [numTasks])

5.3 A List of RDD Action Functions

  • reduce(func)
  • collect()
  • count()
  • first()
  • take(n)
  • takeSample(withReplacement, num, seed)
  • countByKey()
  • foreach(func)
  • saveAsTextFile(path)
  • saveAsSequenceFile(path)

6 RDD Persistence

7 PySpark

8 PySpark Public Classes

9 SparkContext Architecture

10 Class PySpark.SparkContext

10.1 accumulator((value, accum_param=None)

  • Creates an Accumulator with the given initial value, using a given AccumulatorParam helper object to define how to add values of the data type if provided.
accum = sc.accumulator(0)
sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
accum.value
10

10.2 parallelize(c, numSlices=None)

  • Distributes a local Python collection to form an RDD. Using xrange is recommended if the input represents a range for performance.
sc.parallelize([1, 2, 3, 4, 5], 4).glom().collect()
[[1], [2], [3], [4, 5]]
sc.parallelize(xrange(0, 6, 2), 3).glom().collect()
[[0], [2], [4]]

10.3 textFile(name, minPartitions=None, use_unicode=True)

  • Reads a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and returns it as an RDD of Strings.
textline = sc.textFile('/poem.txt') # from HDFS
textline.collect()
[u'There is Another Sky',
 u'',
 u'Emily Dickinson',
 u'',
 u'There is another sky,',
 u'Ever serene and fair,',
 u'And there is another sunshine,',
 u'Though it be darkness there;',
 u'Never mind faded forests, Austin,',
 u'Never mind silent fields -',
 u'Here is a little forest,',
 u'Whose leaf is ever green;',
 u'Here is a brighter garden,',
 u'Where not a frost has been;',
 u'In its unfading flowers',
 u'I hear the bright bee hum:',
 u'Prithee, my brother,',
 u'Into my garden come!']
textline = sc.textFile('/poem.txt', use_unicode = False)
textline.collect()
['There is Another Sky',
 '',
 'Emily Dickinson',
 '',
 'There is another sky,',
 'Ever serene and fair,',
 'And there is another sunshine,',
 'Though it be darkness there;',
 'Never mind faded forests, Austin,',
 'Never mind silent fields -',
 'Here is a little forest,',
 'Whose leaf is ever green;',
 'Here is a brighter garden,',
 'Where not a frost has been;',
 'In its unfading flowers',
 'I hear the bright bee hum:',
 'Prithee, my brother,',
 'Into my garden come!']

11 PySpark RDD Methods and Functions

12 Class PySpark.RDD

12.1 cogroup(other, numPartitions=None)

  • For each key k in self or other, returns a resulting RDD that contains a tuple with the list of values for that key in self as well as other.
x = sc.parallelize([("a", 10), ("b", 20)])
y = sc.parallelize([("b", 30)])
[(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
[('a', ([10], [])), ('b', ([20], [30]))]

12.2 collect()

  • Returns a list that contains all of the elements in this RDD.
sc.range(5).collect()
[0, 1, 2, 3, 4]

12.3 count()

  • Returns the number of elements in this RDD.
sc.parallelize([1, 2, 3, 4, 5, 6]).count()
6

12.4 countByKey()

  • Counts the number of elements for each key.
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("c", 1)])
sorted(rdd.countByKey().items())
[('a', 2), ('b', 1), ('c', 1)]

12.5 countByValue()

  • Returns the count of each unique value in this RDD as (value, count) pairs.
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("c", 1)])
sorted(rdd.countByValue().items())
[(('a', 1), 2), (('b', 1), 1), (('c', 1), 1)]

12.6 distinct(numPartitions=None)

  • Returns a new RDD containing the distinct elements in this RDD.
sorted(sc.parallelize([1, 1, 1, 2, 3, 4, 3]).distinct().collect())
[1, 2, 3, 4]

12.7 filter(f)

  • Returns a new RDD containing only the elements that satisfy a predicate.
rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
rdd1.filter(lambda x: x % 2 == 0).collect()
[2, 4, 6, 8, 10]

12.8 first()

  • Returns the first element in this RDD.
sc.parallelize([10, 20, 30]).first()
10

12.9 map(f, preservesPartitioning=False)

  • Returns a new RDD by applying a function to each element of this RDD.
rdd2 = sc.parallelize(["1", "2", "3"])
sorted(rdd2.map(lambda x: x*10).collect())
['1111111111', '2222222222', '3333333333']

12.10 flatMap(f, preservesPartitioning=False)

  • Returns a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
rdd = sc.parallelize([5, 6, 7])
sorted(rdd.flatMap(lambda x: range(1, x)).collect())
[1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 6]
sorted(rdd.flatMap(lambda x: [(x, x+1), (x+1, x)]).collect())
[(5, 6), (6, 5), (6, 7), (7, 6), (7, 8), (8, 7)]

12.11 foreach(f)

  • Applies a function to all elements of this RDD.
accum = sc.accumulator(0)
sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
accum.value
10

12.12 glom()

  • Returns an RDD created by coalescing all elements within each partition into a list.
rdd = sc.parallelize([1, 2, 3, 4], 2)
sorted(rdd.glom().collect())
[[1, 2], [3, 4]]

12.13 histogram(buckets)

  • Computes a histogram using the buckets, which are all open to the right except for the last which is closed, e.g., [1,10,20,50] means the buckets are [1,10) [10,20) [20,50].
rdd = sc.parallelize([5, 18])
rdd.histogram([1, 10, 20, 50])
([1, 10, 20, 50], [1, 1, 0])
  • Buckets must be sorted and not contain any duplicates, must be at least two elements.
  • If “buckets” is a number, it will generates buckets which are evenly spaced between the minimum and maximum of the RDD.

12.14 reduce(f)

  • Reduces the elements of this RDD using the specified commutative and associative binary operator.

12.15 intersection(other RDD)

  • Returns the intersection of this RDD and another RDD. The output does not contain duplicate elements.

12.16 union(other RDD)

  • Returns the union of this RDD and another one.

12.17 leftOuterJoin(other, numPartitions=None)

  • Performs a left outer join of self and other.

12.18 rightOuterJoin(other, numPartitions=None)

  • Performs a right outer join of self and other.

12.19 max(key=None)

  • Find the maximum item in this RDD.

12.20 min(key=None)

  • Finds the minimum item in this RDD.

12.21 mean()

  • Computes the mean of this RDD’s elements.

12.22 stdev()

  • Computes the standard deviation of this RDD’s elements.

12.23 stats()

  • Returns a StatCounter object that captures the mean, variance and count of the RDD’s elements in one operation.

12.24 sum()

  • Adds up the elements in this RDD.

12.25 Take(N)

  • Takes the first N elements of the RDD as a list.

12.26 takeOrdered(N, key=None)

  • Gets the N elements from a RDD ordered in ascending order or as specified by the optional key function.

12.27 top(N, key=None)

  • Gets the top N elements from a RDD. It returns the list sorted in descending order.

12.28 zip(other RDD)

  • Zips this RDD with another one, returning keyvalue pairs with the first element in each RDD second element in each RDD, etc. Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition.

12.29 zipWithIndex()

  • Zips this RDD with its element indices.

12.30 reduceByKey(func, numPartitions=None)

  • Merges the values for each key using an associative reduce function. This also performs the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce. Output is partitioned with numPartitions partitions, or the default parallelism level if numPartitions is not specified.

13 textFile() and reduceByKey()

14 K-Means Clustering